package com.zuikaku.sink;

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;

public class MyRedisSink implements RedisMapper<Tuple2<String,Integer>> {
    @Override
    public RedisCommandDescription getCommandDescription() {
        //指定要使用的redis命令，以及该key
        return new RedisCommandDescription(RedisCommand.HSET,"ORDER_ITEM_COUNTER");
    }

    @Override
    public String getKeyFromData(Tuple2<String, Integer> stringIntegerTuple2) {
        //取得该hash数据中的key
        return stringIntegerTuple2.f0;
    }

    @Override
    public String getValueFromData(Tuple2<String, Integer> stringIntegerTuple2) {
        //取得该hash数据中的value
        return stringIntegerTuple2.f1.toString();
    }
}
